Person Re-Identification 複数カメラで人物の追跡をしてみました
1 はじめに
CX事業本部の平内(SIN)です。
人物や、顔の画像から特徴を出力し、その類似性で人物の追跡が可能です。
下記は、それぞれ、人物の画像と顔の画像で試してみたのです。
今回は、複数のカメラから撮影された映像で人物の追跡を試してみました。
最初に試してみた様子です。
動画は、EPFLのComputer Vision LaboratoryにあるMulti-camera pedestrians videoから利用させて頂きました。
2 モデル
仕組みとしては、下記の2つのモデルが使用されています。
- 人物検出モデル person-detection-retail-0013
- 個人識別モデル person-reidentification-retail-0270
モデル及び、その適用方法は、Person Re-Identificationで人物を追跡してみましたと同じですので、詳しくは、こちらをご参照下さい。
3 識別情報のデータベース
人物を検出し、その画像を個人識別のデータベースと照合し、類似性から固有のインデックスを取得するわけですが、そのデータベースを3つの動画で共有しています。データベースへの追加や、インデックス検出は、分野(動画の種類)に応じて、ロジックが変わりますが、今回は、以下のような内容になっています。
(1) バウンディングボックの大きさ
一定以上の大きさのバウンディングボックスのものだけを対象としています。遠くに離れて小さく写った人物は、そもそも検出も評価も、精度が低くなるはずなので、一定の大きさ以下は、無視するようになっています。
(2) 人物検出の信頼度
更新は、人物検出の信頼度が0.95以上のものだけを使用していま。最初の段階で人物検出が行われますが、スレッシュホールドレベルとして0.5以上の信頼度のもの表示対象としていますが、識別情報のデータベースの精度を上げるために、データベースの更新は、0.95以上を対象としています。
(3) 人物検出の重なり
更新は、バウンディックスボックスが重複のないものだけ使用しています。信頼度が、0.95以上であっても、他の検出と重複部分のあるデータは、個人の識別情報としては、やや汚染の危険性が有るということで、対象外としました。
(4) 人物検出の信頼度
人物検出の信頼度が高いデータを優先しています。識別情報のデータベースには、人物検出の信頼度も記録し、より高い信頼度のもの更新対象にしています。
(5) 類似度 0.9以上で更新
類似度が0.9以上で、かつ、上記の条件をクリアしたものだけが、データベースに保存されます。
(6) 類似度 0.15以下で新規
類似度が、0.15以下の場合、初めての人物が登場したと判断しデータベースに追加しています。 実は、この閾値が、一番重要だと思います。低く設定しすぎると、新しい人物が検出できなくなり、高くしすぎると、同じ人物の情報が複数生成されてしまい表示が乱れます。
4 コード
作成したコードです。
PersonDetector及び、PersonReidentificationは、それぞれ、人物検出と個人識別モデルを使用するクラスです。
Tracker内に、データベース化された識別情報が格納されており、getIds()でインデックスの取得及び、データベースの更新が行われています。
import numpy as np import time import random import cv2 from openvino.inference_engine import IEPlugin import subprocess import sys from model import Model class ParsonDetector(Model): def __init__(self, plugin, model_path, threshold, num_requests=2): super().__init__(plugin, model_path, num_requests, None) _, _, h, w = self.input_size self.__input_height = h self.__input_width = w self.__threshold = threshold def __prepare_frame(self, frame): initial_h, initial_w = frame.shape[:2] scale_h, scale_w = initial_h / float(self.__input_height), initial_w / float(self.__input_width) in_frame = cv2.resize(frame, (self.__input_width, self.__input_height)) in_frame = in_frame.transpose((2, 0, 1)) in_frame = in_frame.reshape(self.input_size) return in_frame, scale_h, scale_w def infer(self, frame): in_frame, _, _ = self.__prepare_frame(frame) result = super().infer(in_frame) detections = [] height, width = frame.shape[:2] for r in result[0][0]: conf = r[2] if(conf > self.__threshold): x1 = int(r[3] * width) y1 = int(r[4] * height) x2 = int(r[5] * width) y2 = int(r[6] * height) detections.append([x1, y1, x2, y2, conf]) return detections class ParsonReidentification(Model): def __init__(self, plugin, model_path, num_requests=2): super().__init__(plugin, model_path, num_requests, None) _, _, h, w = self.input_size self.__input_height = h self.__input_width = w def __prepare_frame(self, frame): initial_h, initial_w = frame.shape[:2] scale_h, scale_w = initial_h / float(self.__input_height), initial_w / float(self.__input_width) in_frame = cv2.resize(frame, (self.__input_width, self.__input_height)) in_frame = in_frame.transpose((2, 0, 1)) in_frame = in_frame.reshape(self.input_size) return in_frame, scale_h, scale_w def infer(self, frame): in_frame, _, _ = self.__prepare_frame(frame) result = super().infer(in_frame) return np.delete(result, 1) class Tracker: def __init__(self): # 識別情報のDB self.identifysDb = None # 人物検出の信頼度 self.conf = [] def __isOverlap(self, persons, index): [x1, y1, x2, y2, conf] = persons[index] for i, person in enumerate(persons): if(index == i): continue if(max(person[0], x1) <= min(person[2], x2) and max(person[1], y1) <= min(person[3], y2)): return True return False def getIds(self, identifys, persons): if(identifys.size==0): return [] if self.identifysDb is None: self.identifysDb = identifys for person in persons: self.conf.append(person[4]) print("input: {} DB:{}".format(len(identifys), len(self.identifysDb))) similaritys = self.__cos_similarity(identifys, self.identifysDb) similaritys[np.isnan(similaritys)] = 0 ids = np.nanargmax(similaritys, axis=1) for i, similarity in enumerate(similaritys): # DBの更新と追加は、人物検出の信頼度が0.95以上のものだけ if(persons[i][4] < 0.95): continue # DBの更新と追加は、バウンディングボックスに重なりがないものだけ if(self.__isOverlap(persons, i)): continue persionId = ids[i] print("persionId:{} {}".format(persionId,similarity[persionId])) # 0.9以上 if(similarity[persionId] > 0.9): # DBの更新は、信頼度が既存のものより高い場合だけ if(persons[i][4] > self.conf[persionId]): self.identifysDb[persionId] = identifys[i] # 0.15以下で新規に登録する elif(similarity[persionId] < 0.15): print("similarity:{}".format(similarity[persionId])) self.identifysDb = np.vstack((self.identifysDb, identifys[i])) self.conf.append(persons[i][4]) ids[i] = len(self.identifysDb) - 1 print("> append DB size:{}".format(len(self.identifysDb))) print(ids) # 重複がある場合は、信頼度の低い方を無効化する for i, a in enumerate(ids): for e, b in enumerate(ids): if(e == i): continue if(a == b): if(similarity[a] > similarity[b]): ids[i] = -1 else: ids[e] = -1 print(ids) return ids # コサイン類似度 # 参考にさせて頂きました: https://github.com/kodamap/person_reidentification def __cos_similarity(self, X, Y): m = X.shape[0] Y = Y.T return np.dot(X, Y) / ( np.linalg.norm(X.T, axis=0).reshape(m, 1) * np.linalg.norm(Y, axis=0) ) # MacOS device = "CPU" plugin_dirs = "/opt/intel/openvino/deployment_tools/inference_engine/lib/intel64" modelPath = "./FP32/" # RespberryPi p = subprocess.run (('uname', '-a'), stdout = subprocess.PIPE, stderr = subprocess.PIPE) uname = p.stdout.decode() if("armv7l GNU/Linux" in uname): device = "MYRIAD" plugin_dirs = "/opt/intel/openvino/inference_engine/lib/armv7l" modelPath = "./FP16/" plugin = IEPlugin(device=device, plugin_dirs = plugin_dirs) THRESHOLD= 0.5 person_detector = ParsonDetector(plugin, modelPath + "person-detection-retail-0013", THRESHOLD) personReidentification = ParsonReidentification(plugin, modelPath + "person-reidentification-retail-0270") tracker = Tracker() MOVIES = ["video/campus4-c0.mp4", "video/campus4-c1.mp4", "video/campus4-c2.mp4"] SCALE = 1.5 caps = [] for i in range(len(MOVIES)): caps.append(cv2.VideoCapture (MOVIES[i])) colors = [] colors.append((255,255,255)) colors.append((80,80,255)) colors.append((255,255,80)) colors.append((255,80,255)) colors.append((80,255,80)) colors.append((128,80,80)) colors.append((128,128,80)) colors.append((128,128,128)) frames = [] for i in range(len(MOVIES)): frames.append(None) while True: for i in range(len(MOVIES)): grabbed, frames[i] = caps[i].read() if not grabbed: break if not grabbed:# ループ再生 for cap in caps: cap.set(cv2.CAP_PROP_POS_FRAMES, 0) continue for frame in frames: # Personを検知する persons = [] detections = person_detector.infer(frame) if(len(detections) > 0): print("-------------------") for detection in detections: x1 = int(detection[0]) y1 = int(detection[1]) x2 = int(detection[2]) y2 = int(detection[3]) conf = detection[4] print("{:.1f} ({},{})-({},{})".format(conf, x1, y1, x2, y2)) h = y2- y1 if(h<50): print("? HEIGHT:{}".format(h)) else: print("{:.1f} ({},{})-({},{})".format(conf, x1, y1, x2, y2)) persons.append([x1, y1, x2, y2, conf]) print("====================") # 各Personの画像から識別情報を取得する identifys = np.zeros((len(persons), 255)) for i, person in enumerate(persons): # 各Personのimage取得 img = frame[person[1] : person[3], person[0]: person[2]] h, w = img.shape[:2] if(h==0 or w==0): continue # identification取得 identifys[i] = personReidentification.infer(img) # Idの取得 ids = tracker.getIds(identifys, persons) # 枠及びIdを画像に追加 for i, person in enumerate(persons): if(ids[i]!=-1): color = colors[int(ids[i])] frame = cv2.rectangle(frame, (person[0], person[1]), (person[2] ,person[3]), color, 2) frame = cv2.putText(frame, str(ids[i]), (person[0], person[1]), cv2.FONT_HERSHEY_PLAIN, 2, color, 1, cv2.LINE_AA ) # 画像の縮小 h, w = frames[0].shape[:2] for i, frame in enumerate(frames): frames[i] = cv2.resize(frame, ((int(w * SCALE), int(h * SCALE)))) m_h = cv2.hconcat([frames[0], frames[1], frames[2]]) cv2.imshow('frame2', m_h) if cv2.waitKey(1) & 0xFF == ord('q'): break cap.release() cv2.destroyAllWindows()
こちらは、OpenVINOでコンピュータビジョン関連のモデルを使用する場合の、ベースとなるクラスです。
model.py
# # 下記のコードを参考にさせて頂きました。 # https://github.com/openvinotoolkit/open_model_zoo/blob/master/demos/python_demos/asl_recognition_demo/asl_recognition_demo/common.py # from openvino.inference_engine import IENetwork class Model: def __init__(self, plugin, model_path, num_requests, output_shape=None): if model_path.endswith((".xml", ".bin")): model_path = model_path[:-4] model = IENetwork(model_path + ".xml", model_path + ".bin") self.net = plugin.load(network=model) assert len(self.net.input_info) == 1, "One input is expected" self.input_name = next(iter(self.net.input_info)) if len(self.net.outputs) > 1: if output_shape is not None: candidates = [] for candidate_name in self.net.outputs: candidate_shape = self.exec_net.requests[0].output_blobs[candidate_name].buffer.shape if len(candidate_shape) != len(output_shape): continue matches = [src == trg or trg < 0 for src, trg in zip(candidate_shape, output_shape)] if all(matches): candidates.append(candidate_name) if len(candidates) != 1: raise Exception("One output is expected") self.output_name = candidates[0] else: raise Exception("One output is expected") else: self.output_name = next(iter(self.net.outputs)) self.input_size = self.net.input_info[self.input_name].input_data.shape self.output_size = self.net.requests[0].output_blobs[self.output_name].buffer.shape self.num_requests = num_requests def infer(self, data): input_data = {self.input_name: data} infer_result = self.net.infer(input_data) return infer_result[self.output_name]
5 最後に
今回は、別の角度から複数のカメラで撮影された動画で人物追跡を行うプログラムを試してみました。
複数カメラで追跡ができると、ちょっとPerson Re-Identificationの威力を感じました。
6 参考にさせて頂いたページ
Pedestrian Tracking Demo | OpenVINO™ toolkit | Ep. 23 | Intel Software
Pedestrian Tracker C++ Demo
OpenVINO の Person-reidentification(人再認識)モデルを使って人を追跡する
Github kodamap/person_reidentification
CVPR2019でPerson Re-Identificationの話をしている論文全てに1人で目を通す(加筆修正中)